package com.stars.easyms.schedule.mq;

import com.stars.easyms.schedule.client.DistributedTaskExecutionResultWrapper;
import com.stars.easyms.schedule.core.DistributedScheduleManager;
import com.stars.easyms.schedule.bean.DbScheduleSubTask;
import com.stars.easyms.schedule.bean.DbScheduleSubTaskRecord;
import com.stars.easyms.schedule.enums.TaskStatus;
import com.stars.easyms.schedule.record.AsynUpdateSubTaskRecord;
import com.stars.easyms.schedule.service.DistributedScheduleService;
import com.stars.easyms.schedule.util.DateUtil;
import com.stars.easyms.schedule.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;

/**
 * 抽象MQListener
 *
 * @author guoguifang
 */
public abstract class AbstractMQListener implements MQListener {
    
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    protected void updateSubTaskStatus(DistributedTaskExecutionResultWrapper distributedTaskExecutionResultWrapper) {
        DbScheduleSubTask executableSubTask = new DbScheduleSubTask();
        executableSubTask.setId(distributedTaskExecutionResultWrapper.getId());
        executableSubTask.setVersion(distributedTaskExecutionResultWrapper.getVersion());
        if (distributedTaskExecutionResultWrapper.isExecutionSuccess()) {
            executableSubTask.setStatus(TaskStatus.COMPLETE.getCode());
        } else {
            executableSubTask.setErrorMessage(distributedTaskExecutionResultWrapper.getErrorMsg());
            executableSubTask.setStatus(TaskStatus.ERROR.getCode());
        }
        Timestamp finishTime = new Timestamp(System.currentTimeMillis());
        int updateCount = DistributedScheduleService.getInstance().updateDbScheduleSubTaskById(executableSubTask);
        String executableSubTaskName = StringUtils.join(new Object[]{distributedTaskExecutionResultWrapper.getTaskId(), distributedTaskExecutionResultWrapper.getBatchNo(), distributedTaskExecutionResultWrapper.getPartitionCount(), distributedTaskExecutionResultWrapper.getPartitionIndex()}, "-");
        if (updateCount > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("The distributed task [{}] execute completely, status change for --> '{}', end time：{}!", executableSubTaskName, TaskStatus.forCode(executableSubTask.getStatus()), DateUtil.getDateStrByMillisecond(finishTime));
            }
            // 异步记录子任务处理信息
            DbScheduleSubTaskRecord dbScheduleSubTaskRecord = new DbScheduleSubTaskRecord();
            dbScheduleSubTaskRecord.setSubTaskId(distributedTaskExecutionResultWrapper.getId());
            dbScheduleSubTaskRecord.setBatchNo(distributedTaskExecutionResultWrapper.getBatchNo());
            dbScheduleSubTaskRecord.setFinishTime(finishTime);
            dbScheduleSubTaskRecord.setStatus(executableSubTask.getStatus());
            AsynUpdateSubTaskRecord.add(dbScheduleSubTaskRecord);

            // 当子任务执行完毕后需要通知调度器更新主任务状态
            DistributedScheduleManager.getSingleInstance().signalIdleScheduler();
        } else {
            logger.error("The distributed task [{}] execute completely, but the update database failed because of a database throwable or the task has been executed by other threads!", executableSubTaskName);
        }
    }

}
